home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / executor / n_hash.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  26.9 KB  |  1,012 lines

  1. /* ----------------------------------------------------------------
  2.  *   FILE
  3.  *    hash.c
  4.  *    
  5.  *   DESCRIPTION
  6.  *    Routines to hash relations for hashjoin
  7.  *
  8.  *   INTERFACE ROUTINES
  9.  *         ExecHash    - generate an in-memory hash table of the relation 
  10.  *         ExecInitHash    - initialize node and subnodes..
  11.  *         ExecEndHash    - shutdown node and subnodes
  12.  *
  13.  *   NOTES
  14.  *    
  15.  *   IDENTIFICATION
  16.  *    $Header: /private/postgres/src/executor/RCS/n_hash.c,v 1.17 1992/08/19 01:10:30 mer Exp $
  17.  * ----------------------------------------------------------------
  18.  */
  19.  
  20. #include <math.h>
  21. #include <sys/file.h>
  22. #include "storage/ipci.h"
  23. #include "storage/bufmgr.h"    /* for BLCKSZ */
  24. #include "tcop/slaves.h"
  25. #include "executor/executor.h"
  26.  
  27.  RcsId("$Header: /private/postgres/src/executor/RCS/n_hash.c,v 1.17 1992/08/19 01:10:30 mer Exp $");
  28.  
  29.  
  30. extern int NBuffers;
  31. static int HashTBSize;
  32.  
  33. /* ----------------------------------------------------------------
  34.  *       ExecHash
  35.  *
  36.  *    build hash table for hashjoin, all do partitioning if more
  37.  *    than one batches are required.
  38.  * ----------------------------------------------------------------
  39.  */
  40. /**** xxref:
  41.  *           ExecProcNode
  42.  ****/
  43. TupleTableSlot
  44. ExecHash(node)
  45. Hash node;
  46. {
  47.     EState      estate;
  48.     HashState      hashstate;
  49.     Plan       outerNode;
  50.     Var            hashkey;
  51.     HashJoinTable hashtable;
  52.     HeapTuple     heapTuple;
  53.     TupleTableSlot slot;
  54.     ExprContext      econtext;
  55.  
  56.     int          nbatch;
  57.     File      *batches;
  58.     RelativeAddr  *batchPos;
  59.     int          *batchSizes;
  60.     char      *tempname;
  61.     int          i;
  62.     RelativeAddr  *innerbatchNames;
  63.     
  64.     /* ----------------
  65.      *    get state info from node
  66.      * ----------------
  67.      */
  68.     
  69.     hashstate =   get_hashstate(node);
  70.     estate =      (EState) get_state((Plan) node);
  71.     outerNode =   get_outerPlan((Plan) node);
  72.  
  73.     if (!IsMaster && ParallelExecutorEnabled()) {
  74.     IpcMemoryId  shmid;
  75.     IpcMemoryKey hashtablekey;
  76.     int         hashtablesize;
  77.     int         tmpfd;
  78.  
  79.     hashtablekey = get_hashtablekey(node);
  80.     hashtablesize = get_hashtablesize(node);
  81.     
  82.     /* ----------------
  83.      *  in Sequent version, shared memory is implemented by 
  84.      *  memory mapped files, it takes one file descriptor.
  85.      *  we may have to free one for this.
  86.      * ----------------
  87.      */
  88.     closeOneVfd();
  89.         shmid = IpcMemoryCreateWithoutOnExit(hashtablekey,
  90.                             hashtablesize,
  91.                             HASH_PERMISSION);
  92.         hashtable = (HashJoinTable) IpcMemoryAttach(shmid);
  93.     set_hashtable(node, hashtable);
  94.     }
  95.     
  96.     hashtable =    get_hashtable(node);
  97.     if (hashtable == NULL)
  98.     elog(WARN, "ExecHash: hash table is NULL.");
  99.     
  100.     nbatch = hashtable->nbatch;
  101.     
  102.     if (nbatch > 0) {  /* if needs hash partition */
  103.     innerbatchNames = (RelativeAddr *) ABSADDR(hashtable->innerbatchNames);
  104.  
  105.     /* --------------
  106.      *  allocate space for the file descriptors of batch files
  107.      *  then open the batch files in the current processes.
  108.      * --------------
  109.      */
  110.     batches = (File*)palloc(nbatch * sizeof(File));
  111.     for (i=0; i<nbatch; i++) {
  112.         batches[i] = FileNameOpenFile(ABSADDR(innerbatchNames[i]),
  113.                      O_CREAT | O_RDWR, 0600);
  114.     }
  115.     set_hashBatches(hashstate, batches);
  116.         batchPos = (RelativeAddr*) ABSADDR(hashtable->innerbatchPos);
  117.         batchSizes = (int*) ABSADDR(hashtable->innerbatchSizes);
  118.     }
  119.  
  120.     /* ----------------
  121.      *    set expression context
  122.      * ----------------
  123.      */
  124.     hashkey = get_hashkey(node);
  125.     econtext = get_cs_ExprContext((CommonState) hashstate);
  126.  
  127.     /* ----------------
  128.      *    get tuple and insert into the hash table
  129.      * ----------------
  130.      */
  131.     for (;;) {
  132.     slot = ExecProcNode(outerNode);
  133.     if (TupIsNull((Pointer)slot))
  134.         break;
  135.     
  136.     set_ecxt_innertuple(econtext, slot);
  137.     ExecHashTableInsert(hashtable, econtext, hashkey, 
  138.                 get_hashBatches(hashstate));
  139.  
  140.     ExecClearTuple((Pointer) slot);
  141.     }
  142.     
  143.     /*
  144.      * end of build phase, flush all the last pages of the batches.
  145.      */
  146.     for (i=0; i<nbatch; i++) {
  147.     if (FileSeek(batches[i], 0L, L_XTND) < 0)
  148.         perror("FileSeek");
  149.     if (FileWrite(batches[i],ABSADDR(hashtable->batch)+i*BLCKSZ,BLCKSZ) < 0)
  150.         perror("FileWrite");
  151.     NDirectFileWrite++;
  152.    }
  153.     
  154.     /* ---------------------
  155.      *  Return the slot so that we have the tuple descriptor 
  156.      *  when we need to save/restore them.  -Jeff 11 July 1991
  157.      * ---------------------
  158.      */
  159.     return slot;
  160. }
  161.  
  162. /* ----------------------------------------------------------------
  163.  *       ExecInitHash
  164.  *
  165.  *       Init routine for Hash node
  166.  * ----------------------------------------------------------------
  167.  */
  168. /**** xxref:
  169.  *           ExecInitNode
  170.  ****/
  171. List
  172. ExecInitHash(node, estate, parent)
  173.     Hash     node;
  174.     EState     estate;
  175.     Plan    parent;
  176. {
  177.     HashState        hashstate;
  178.     Plan        outerPlan;
  179.     
  180.     SO1_printf("ExecInitHash: %s\n",
  181.            "initializing hash node");
  182.     
  183.     /* ----------------
  184.      *  assign the node's execution state
  185.      * ----------------
  186.      */
  187.     set_state((Plan) node,  (EStatePtr)estate);
  188.     
  189.     /* ----------------
  190.      * create state structure
  191.      * ----------------
  192.      */
  193.     hashstate = MakeHashState(NULL);
  194.     set_hashstate(node, hashstate);
  195.     set_hashBatches(hashstate, NULL);
  196.     
  197.     /* ----------------
  198.      *  Miscellanious initialization
  199.      *
  200.      *         +    assign node's base_id
  201.      *       +    assign debugging hooks and
  202.      *       +    create expression context for node
  203.      * ----------------
  204.      */
  205.     ExecAssignNodeBaseInfo(estate, (BaseNode) hashstate, parent);
  206.     ExecAssignDebugHooks((Plan) node, (BaseNode) hashstate);
  207.     ExecAssignExprContext(estate, (CommonState) hashstate);
  208.     
  209. #define HASH_NSLOTS 1
  210.     /* ----------------
  211.      * initialize our result slot
  212.      * ----------------
  213.      */
  214.     ExecInitResultTupleSlot(estate, (CommonState) hashstate);
  215.     
  216.     /* ----------------
  217.      * initializes child nodes
  218.      * ----------------
  219.      */
  220.     outerPlan = get_outerPlan((Plan) node);
  221.     ExecInitNode(outerPlan, estate, (Plan) node);
  222.     
  223.     /* ----------------
  224.      *     initialize tuple type. no need to initialize projection
  225.      *  info because this node doesn't do projections
  226.      * ----------------
  227.      */
  228.     ExecAssignResultTypeFromOuterPlan((Plan) node, (CommonState) hashstate);
  229.     set_cs_ProjInfo((CommonState) hashstate, NULL);
  230.  
  231.     return
  232.     LispTrue;
  233. }
  234.  
  235. int
  236. ExecCountSlotsHash(node)
  237.     Plan node;
  238. {
  239.     return ExecCountSlotsNode(get_outerPlan(node)) +
  240.        ExecCountSlotsNode(get_innerPlan(node)) +
  241.        HASH_NSLOTS;
  242. }
  243.  
  244. /* ---------------------------------------------------------------
  245.  *       ExecEndHash
  246.  *
  247.  *    clean up routine for Hash node
  248.  * ----------------------------------------------------------------
  249.  */
  250.  
  251. /**** xxref:
  252.  *           ExecEndNode
  253.  ****/
  254. void
  255. ExecEndHash(node)
  256.     Hash node;
  257. {
  258.     HashState        hashstate;
  259.     Plan        outerPlan;
  260.     File        *batches;
  261.     
  262.     /* ----------------
  263.      *    get info from the hash state 
  264.      * ----------------
  265.      */
  266.     hashstate = get_hashstate(node);
  267.     batches = get_hashBatches(hashstate);
  268.     if (batches != NULL)
  269.        pfree(batches);
  270.  
  271.     /* ----------------
  272.      *    free projection info.  no need to free result type info
  273.      *  because that came from the outer plan...
  274.      * ----------------
  275.      */
  276.     ExecFreeProjectionInfo((CommonState) hashstate);
  277.     
  278.     /* ----------------
  279.      *    shut down the subplan
  280.      * ----------------
  281.      */
  282.     outerPlan = get_outerPlan((Plan) node);
  283.     ExecEndNode(outerPlan);
  284.  
  285. RelativeAddr
  286. hashTableAlloc(size, hashtable)
  287. int size;
  288. HashJoinTable hashtable;
  289. {
  290.     RelativeAddr p;
  291.     p = hashtable->top;
  292.     hashtable->top += size;
  293.     return p;
  294. }
  295.  
  296. /* ----------------------------------------------------------------
  297.  *    ExecHashTableCreate
  298.  *
  299.  *    create a hashtable in shared memory for hashjoin.
  300.  * ----------------------------------------------------------------
  301.  */
  302. #define NTUP_PER_BUCKET        10
  303. #define FUDGE_FAC        1.5
  304.  
  305. HashJoinTable
  306. ExecHashTableCreate(node)
  307.     Plan node;
  308. {
  309.     Plan      outerNode;
  310.     int          nbatch;
  311.     int       ntuples;
  312.     int       tupsize;
  313.     IpcMemoryId   shmid;
  314.     HashJoinTable hashtable;
  315.     HashBucket       bucket;
  316.     int       nbuckets;
  317.     int          totalbuckets;
  318.     int       bucketsize;
  319.     int       i;
  320.     int          tmpfd;
  321. #ifdef sequent
  322.     slock_t       *batchLock;
  323. #endif
  324.     RelativeAddr  *outerbatchNames;
  325.     RelativeAddr  *outerbatchPos;
  326.     RelativeAddr  *innerbatchNames;
  327.     RelativeAddr  *innerbatchPos;
  328.     int          *innerbatchSizes;
  329.     RelativeAddr  tempname;
  330.  
  331.     nbatch = -1;
  332.     HashTBSize = NBuffers/2;
  333.     while (nbatch < 0) {
  334.     /*
  335.      * determine number of batches for the hashjoin
  336.      */
  337.     HashTBSize *= 2;
  338.     nbatch = ExecHashPartition((Hash) node);
  339.       }
  340.     /* ----------------
  341.      *    get information about the size of the relation
  342.      * ----------------
  343.      */
  344.     outerNode = get_outerPlan(node);
  345.     ntuples = get_plan_size(outerNode);
  346.     if (ntuples <= 0)
  347.     ntuples = 1000;  /* XXX just a hack */
  348.     tupsize = get_plan_width(outerNode) + sizeof(HeapTupleData);
  349.  
  350.     /*
  351.      * totalbuckets is the total number of hash buckets needed for
  352.      * the entire relation
  353.      */
  354.     totalbuckets = ceil((double)ntuples/NTUP_PER_BUCKET);
  355.     bucketsize = NTUP_PER_BUCKET * tupsize + sizeof(*bucket);
  356.     
  357.     /*
  358.      * nbuckets is the number of hash buckets for the first pass
  359.      * of hybrid hashjoin
  360.      */
  361.     nbuckets = (HashTBSize - nbatch) * BLCKSZ / (bucketsize * FUDGE_FAC);
  362.     if (totalbuckets < nbuckets)
  363.        totalbuckets = nbuckets;
  364.     if (nbatch == 0)
  365.        nbuckets = totalbuckets;
  366. #ifdef HJDEBUG
  367.     printf("nbatch = %d, totalbuckets = %d, nbuckets = %d\n", nbatch, totalbuckets, nbuckets);
  368. #endif
  369.  
  370. #ifdef sequent
  371.     /* ----------------
  372.      *    in Sequent version, shared memory is implemented by 
  373.      *  memory mapped files, it takes one file descriptor.
  374.      *  we may have to free one for this.
  375.      * ----------------
  376.      */
  377.     closeOneVfd();
  378.     if (ParallelExecutorEnabled()) {
  379.        IpcMemoryKey hashtablekey;
  380.        int        hashtablesize;
  381.        hashtablekey = get_hashtablekey((Hash)node);
  382.        hashtablesize = (HashTBSize+SlaveLocalInfoD.nparallel)*BLCKSZ;
  383.        shmid = IpcMemoryCreateWithoutOnExit(hashtablekey,
  384.                             hashtablesize,
  385.                             HASH_PERMISSION);
  386.        set_hashtablesize((Hash)node, hashtablesize);
  387.       }
  388.     else
  389.        shmid = IpcMemoryCreateWithoutOnExit(PrivateIPCKey,
  390.                      (HashTBSize+1)*BLCKSZ,HASH_PERMISSION);
  391.     hashtable = (HashJoinTable) IpcMemoryAttach(shmid);
  392. #else /* sequent */
  393.     /* ----------------
  394.      *  in non-parallel machines, we don't need to put the hash table
  395.      *  in the shared memory.  We just palloc it.
  396.      * ----------------
  397.      */
  398.     hashtable = (HashJoinTable)palloc((HashTBSize+1)*BLCKSZ);
  399. #endif /* sequent */
  400.  
  401.     if (hashtable == NULL) {
  402.     elog(WARN, "not enough memory for hashjoin.");
  403.       }
  404.     /* ----------------
  405.      *    initialize the hash table header
  406.      * ----------------
  407.      */
  408.     hashtable->nbuckets = nbuckets;
  409.     hashtable->totalbuckets = totalbuckets;
  410.     hashtable->bucketsize = bucketsize;
  411.     hashtable->shmid = shmid;
  412.     hashtable->top = sizeof(HashTableData);
  413. #ifdef sequent
  414.     S_INIT_LOCK(&(hashtable->overflowlock));
  415. #endif
  416.     hashtable->bottom = HashTBSize * BLCKSZ;
  417.     /*
  418.      *  hashtable->readbuf has to be long aligned!!!
  419.      */
  420.     hashtable->readbuf = hashtable->bottom;
  421.     hashtable->nbatch = nbatch;
  422.     hashtable->curbatch = 0;
  423.     hashtable->pcount = hashtable->nprocess = SlaveLocalInfoD.nparallel;
  424. #ifdef sequent
  425.     S_INIT_BARRIER(&(hashtable->batchBarrier), hashtable->nprocess);
  426.     S_INIT_LOCK(&(hashtable->tableLock));
  427. #endif
  428.     if (nbatch > 0) {
  429. #ifdef sequent
  430.     /* ---------------
  431.      *  allocate and initialize locks for each batch
  432.      * ---------------
  433.      */
  434.     batchLock = (slock_t*)ABSADDR(
  435.                hashTableAlloc(nbatch * sizeof(slock_t), hashtable));
  436.     for (i=0; i<nbatch; i++) 
  437.         S_INIT_LOCK(&(batchLock[i]));
  438.     hashtable->batchLock = RELADDR(batchLock);
  439. #endif
  440.     /* ---------------
  441.      *  allocate and initialize the outer batches
  442.      * ---------------
  443.      */
  444.     outerbatchNames = (RelativeAddr*)ABSADDR(
  445.         hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
  446.     outerbatchPos = (RelativeAddr*)ABSADDR(
  447.             hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
  448.     for (i=0; i<nbatch; i++) {
  449.         tempname = hashTableAlloc(12, hashtable);
  450.         mk_hj_temp(ABSADDR(tempname));
  451.         outerbatchNames[i] = tempname;
  452.         outerbatchPos[i] = -1;
  453.       }
  454.     hashtable->outerbatchNames = RELADDR(outerbatchNames);
  455.     hashtable->outerbatchPos = RELADDR(outerbatchPos);
  456.     /* ---------------
  457.      *  allocate and initialize the inner batches
  458.      * ---------------
  459.      */
  460.     innerbatchNames = (RelativeAddr*)ABSADDR(
  461.            hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
  462.     innerbatchPos = (RelativeAddr*)ABSADDR(
  463.            hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
  464.     innerbatchSizes = (int*)ABSADDR(
  465.                  hashTableAlloc(nbatch * sizeof(int), hashtable));
  466.     for (i=0; i<nbatch; i++) {
  467.         tempname = hashTableAlloc(12, hashtable);
  468.         mk_hj_temp(ABSADDR(tempname));
  469.         innerbatchNames[i] = tempname;
  470.         innerbatchPos[i] = -1;
  471.         innerbatchSizes[i] = 0;
  472.       }
  473.     hashtable->innerbatchNames = RELADDR(innerbatchNames);
  474.     hashtable->innerbatchPos = RELADDR(innerbatchPos);
  475.     hashtable->innerbatchSizes = RELADDR(innerbatchSizes);
  476.     }
  477.     else {
  478.     hashtable->outerbatchNames = (RelativeAddr)NULL;
  479.     hashtable->outerbatchPos = (RelativeAddr)NULL;
  480.     hashtable->innerbatchNames = (RelativeAddr)NULL;
  481.     hashtable->innerbatchPos = (RelativeAddr)NULL;
  482.     hashtable->innerbatchSizes = (RelativeAddr)NULL;
  483.     }
  484.  
  485.     hashtable->batch = (RelativeAddr)LONGALIGN(hashtable->top + 
  486.                            bucketsize * nbuckets);
  487.     hashtable->overflownext=hashtable->batch + nbatch * BLCKSZ;
  488.     /* ----------------
  489.      *    initialize each hash bucket
  490.      * ----------------
  491.      */
  492.     bucket = (HashBucket)ABSADDR(hashtable->top);
  493.     for (i=0; i<nbuckets; i++) {
  494.     bucket->top = RELADDR((char*)bucket + sizeof(*bucket));
  495.     bucket->bottom = bucket->top;
  496.     bucket->firstotuple = bucket->lastotuple = -1;
  497. #ifdef sequent
  498.     S_INIT_LOCK(&(bucket->bucketlock));
  499. #endif
  500.     bucket = (HashBucket)((char*)bucket + bucketsize);
  501.     }
  502.     return(hashtable);
  503. }
  504.  
  505. /* ----------------------------------------------------------------
  506.  *    ExecHashTableInsert
  507.  *
  508.  *    insert a tuple into the hash table depending on the hash value
  509.  *    it may just go to a tmp file for other batches
  510.  * ----------------------------------------------------------------
  511.  */
  512. void
  513. ExecHashTableInsert(hashtable, econtext, hashkey, batches)
  514.     HashJoinTable hashtable;
  515.     ExprContext   econtext;
  516.     Var       hashkey;
  517.     File      *batches;
  518. {
  519.     TupleTableSlot    slot;
  520.     HeapTuple         heapTuple;
  521.     Const         hashvalue;
  522.     HashBucket         bucket;
  523.     int            bucketno;
  524.     int            nbatch;
  525.     int            batchno;
  526.     char        *buffer;
  527.     RelativeAddr    *batchPos;
  528.     int            *batchSizes;
  529. #ifdef sequent
  530.     slock_t        *batchLock;
  531. #endif
  532.     char        *pos;
  533.  
  534.     nbatch =         hashtable->nbatch;
  535.     batchPos =         (RelativeAddr*)ABSADDR(hashtable->innerbatchPos);
  536.     batchSizes =     (int*)ABSADDR(hashtable->innerbatchSizes);
  537. #ifdef sequent
  538.     batchLock =     (slock_t*)ABSADDR(hashtable->batchLock);
  539. #endif
  540.  
  541.     slot = get_ecxt_innertuple(econtext);
  542.     heapTuple = (HeapTuple) CAR(slot);
  543.     
  544. #ifdef HJDEBUG
  545.     printf("Inserting ");
  546. #endif
  547.     
  548.     bucketno = ExecHashGetBucket(hashtable, econtext, hashkey);
  549.  
  550.     /* ----------------
  551.      *    decide whether to put the tuple in the hash table or a tmp file
  552.      * ----------------
  553.      */
  554.     if (bucketno < hashtable->nbuckets) {
  555.     /* ---------------
  556.      *  put the tuple in hash table
  557.      * ---------------
  558.      */
  559.     bucket = (HashBucket)
  560.         (ABSADDR(hashtable->top) + bucketno * hashtable->bucketsize);
  561. #ifdef sequent
  562.     /* --------------
  563.      *  lock the hash bucket
  564.      * ---------------
  565.      */
  566.         if (ParallelExecutorEnabled())
  567.        S_LOCK(&(bucket->bucketlock));
  568. #endif
  569.     if ((char*)LONGALIGN(ABSADDR(bucket->bottom))
  570.          -(char*)bucket+heapTuple->t_len > hashtable->bucketsize)
  571.         ExecHashOverflowInsert(hashtable, bucket, heapTuple);
  572.     else {
  573.         bcopy(heapTuple,(char*)LONGALIGN(ABSADDR(bucket->bottom)),
  574.           heapTuple->t_len); 
  575.         bucket->bottom = 
  576.         ((RelativeAddr)LONGALIGN(bucket->bottom) + heapTuple->t_len);
  577.     }
  578. #ifdef sequent
  579.     /* --------------
  580.      *  insertion done, unlock the hash bucket
  581.      * --------------
  582.      */
  583.         if (ParallelExecutorEnabled())
  584.        S_UNLOCK(&(bucket->bucketlock));
  585. #endif
  586.     }
  587.     else {
  588.     /* -----------------
  589.      * put the tuple into a tmp file for other batches
  590.      * -----------------
  591.      */
  592.     batchno = (float)(bucketno - hashtable->nbuckets)/
  593.           (float)(hashtable->totalbuckets - hashtable->nbuckets)
  594.           * nbatch;
  595.     buffer = ABSADDR(hashtable->batch) + batchno * BLCKSZ;
  596. #ifdef sequent
  597.     /* ----------------
  598.      *  lock the batch
  599.      * ----------------
  600.      */
  601.     if (ParallelExecutorEnabled())
  602.         S_LOCK(&(batchLock[batchno]));
  603. #endif
  604.     batchSizes[batchno]++;
  605.     pos= (char *)
  606.         ExecHashJoinSaveTuple(heapTuple,
  607.                   buffer,
  608.                   batches[batchno],
  609.                   ABSADDR(batchPos[batchno]));
  610.     batchPos[batchno] = RELADDR(pos);
  611. #ifdef sequent
  612.     /* -----------------
  613.      *  unlock the batch
  614.      * -----------------
  615.      */
  616.     if (ParallelExecutorEnabled())
  617.         S_UNLOCK(&(batchLock[batchno]));
  618. #endif
  619.     }
  620. }
  621.  
  622. /* ----------------------------------------------------------------
  623.  *    ExecHashTableDestroy
  624.  *
  625.  *    destroy a hash table
  626.  * ----------------------------------------------------------------
  627.  */
  628. void
  629. ExecHashTableDestroy(hashtable)
  630.     HashJoinTable hashtable;
  631. {
  632. #ifdef sequent
  633.     IPCPrivateMemoryKill(0, hashtable->shmid);
  634. #else /* sequent */
  635.     pfree((Pointer)hashtable);
  636. #endif /* sequent */
  637. }
  638.  
  639. /* ----------------------------------------------------------------
  640.  *    ExecHashGetBucket
  641.  *
  642.  *    Get the hash value for a tuple
  643.  * ----------------------------------------------------------------
  644.  */
  645. int
  646. ExecHashGetBucket(hashtable, econtext, hashkey)
  647.     HashJoinTable hashtable;
  648.     ExprContext   econtext;
  649.     Var       hashkey;
  650. {
  651.     int     bucketno;
  652.     HashBucket     bucket;
  653.     Const     hashvalue;
  654.     Datum     keyval;
  655.     extern Boolean execConstByVal;
  656.     extern int execConstLen;
  657.     Boolean isNull;
  658.  
  659.  
  660.     /* ----------------
  661.      *    Get the join attribute value of the tuple
  662.      * ----------------
  663.      */
  664.     keyval = ExecEvalVar(hashkey, econtext, &isNull);
  665.     
  666.     /* ------------------
  667.      *  compute the hash function
  668.      * ------------------
  669.      */
  670.     if (execConstByVal)
  671.         bucketno =
  672.         hashFunc((char *) &keyval, execConstLen) % hashtable->totalbuckets;
  673.     else
  674.         bucketno =
  675.         hashFunc((char *) keyval, execConstLen) % hashtable->totalbuckets;
  676. #ifdef HJDEBUG
  677.     if (bucketno >= hashtable->nbuckets)
  678.        printf("hash(%d) = %d SAVED\n", keyval, bucketno);
  679.     else
  680.        printf("hash(%d) = %d\n", keyval, bucketno);
  681. #endif
  682.  
  683.     return(bucketno);
  684. }
  685.  
  686. /* ----------------------------------------------------------------
  687.  *    ExecHashOverflowInsert
  688.  *
  689.  *    insert into the overflow area of a hash bucket
  690.  * ----------------------------------------------------------------
  691.  */
  692. void
  693. ExecHashOverflowInsert(hashtable, bucket, heapTuple)
  694.     HashJoinTable hashtable;
  695.     HashBucket       bucket;
  696.     HeapTuple       heapTuple;
  697. {
  698.     OverflowTuple otuple;
  699.     RelativeAddr  newend;
  700.     OverflowTuple firstotuple;
  701.     OverflowTuple lastotuple;
  702.  
  703.     firstotuple = (OverflowTuple)ABSADDR(bucket->firstotuple);
  704.     lastotuple = (OverflowTuple)ABSADDR(bucket->lastotuple);
  705. #ifdef sequent
  706.     if (ParallelExecutorEnabled()) {
  707.         /* -----------------
  708.          *  Here we assume that the current bucket lock has already
  709.      *  been acquried.  But we still need to lock the overflow lock.
  710.      *  This could be a potential bottleneck.
  711.      * -----------------
  712.      */
  713.     S_LOCK(&(hashtable->overflowlock));
  714.        }
  715. #endif
  716.     /* ----------------
  717.      *    see if we run out of overflow space
  718.      * ----------------
  719.      */
  720.     newend = (RelativeAddr)LONGALIGN(hashtable->overflownext + sizeof(*otuple)
  721.          + heapTuple->t_len);
  722.     if (newend > hashtable->bottom) {
  723. #ifdef sequent
  724.     elog(WARN, "hash table out of memory.");
  725. #else
  726.     elog(DEBUG, "hash table out of memory. expanding.");
  727.     /* ------------------
  728.      * XXX this is a temporary hack
  729.      * eventually, recursive hash partitioning will be 
  730.      * implemented
  731.      * ------------------
  732.      */
  733.     hashtable->readbuf = hashtable->bottom = 2 * hashtable->bottom;
  734.     hashtable = (HashJoinTable)repalloc(hashtable, hashtable->bottom+BLCKSZ);
  735.     if (hashtable == NULL) {
  736.         perror("repalloc");
  737.         elog(WARN, "can't expand hashtable.");
  738.       }
  739. #endif
  740.     }
  741.     
  742.     /* ----------------
  743.      *    establish the overflow chain
  744.      * ----------------
  745.      */
  746.     otuple = (OverflowTuple)ABSADDR(hashtable->overflownext);
  747.     hashtable->overflownext = newend;
  748. #ifdef sequent
  749.     /* ------------------
  750.      *  unlock the overflow chain
  751.      * ------------------
  752.      */
  753.     if (ParallelExecutorEnabled())
  754.     S_UNLOCK(&(hashtable->overflowlock));
  755. #endif
  756.     if (firstotuple == NULL)
  757.     bucket->firstotuple = bucket->lastotuple = RELADDR(otuple);
  758.     else {
  759.     lastotuple->next = RELADDR(otuple);
  760.     bucket->lastotuple = RELADDR(otuple);
  761.      }
  762.     
  763.     /* ----------------
  764.      *    copy the tuple into the overflow area
  765.      * ----------------
  766.      */
  767.     otuple->next = -1;
  768.     otuple->tuple = RELADDR(LONGALIGN(((char*)otuple + sizeof(*otuple))));
  769.     bcopy(heapTuple, ABSADDR(otuple->tuple), heapTuple->t_len);
  770. }
  771.  
  772. /* ----------------------------------------------------------------
  773.  *    ExecScanHashBucket
  774.  *
  775.  *    scan a hash bucket of matches
  776.  * ----------------------------------------------------------------
  777.  */
  778. HeapTuple
  779. ExecScanHashBucket(hjstate, bucket, curtuple, hjclauses, econtext)
  780.     HashJoinState     hjstate;
  781.     HashBucket         bucket;
  782.     HeapTuple         curtuple;
  783.     List         hjclauses;
  784.     ExprContext     econtext;
  785. {
  786.     HeapTuple         heapTuple;
  787.     bool         qualResult;
  788.     OverflowTuple     otuple = NULL;
  789.     OverflowTuple     curotuple;
  790.     TupleTableSlot    inntuple;
  791.     OverflowTuple    firstotuple;
  792.     OverflowTuple    lastotuple;
  793.     HashJoinTable    hashtable;
  794.  
  795.     hashtable = get_hj_HashTable(hjstate);
  796.     firstotuple = (OverflowTuple)ABSADDR(bucket->firstotuple);
  797.     lastotuple = (OverflowTuple)ABSADDR(bucket->lastotuple);
  798.     
  799.     /* ----------------
  800.      *    search the hash bucket
  801.      * ----------------
  802.      */
  803.     if (curtuple == NULL || curtuple < (HeapTuple)ABSADDR(bucket->bottom)) {
  804.         if (curtuple == NULL)
  805.         heapTuple = (HeapTuple)
  806.         LONGALIGN(ABSADDR(bucket->top));
  807.     else 
  808.         heapTuple = (HeapTuple)
  809.         LONGALIGN(((char*)curtuple+curtuple->t_len));
  810.     
  811.         while (heapTuple < (HeapTuple)ABSADDR(bucket->bottom)) {
  812.     
  813.         inntuple = (TupleTableSlot)
  814.         ExecStoreTuple((Pointer) heapTuple,    /* tuple to store */
  815.                    (Pointer) get_hj_HashTupleSlot(hjstate),
  816.                    /* slot */
  817.                    InvalidBuffer,    /* tuple has no buffer */
  818.                    false);        /* do not pfree this tuple */
  819.         
  820.         set_ecxt_innertuple(econtext, inntuple);
  821.         qualResult = ExecQual(hjclauses, econtext);
  822.         
  823.         if (qualResult)
  824.         return heapTuple;
  825.        
  826.         heapTuple = (HeapTuple)
  827.         LONGALIGN(((char*)heapTuple+heapTuple->t_len));
  828.     }
  829.     
  830.     if (firstotuple == NULL)
  831.         return NULL;
  832.     otuple = firstotuple;
  833.     }
  834.  
  835.     /* ----------------
  836.      *    search the overflow area of the hash bucket
  837.      * ----------------
  838.      */
  839.     if (otuple == NULL) {
  840.     curotuple = get_hj_CurOTuple(hjstate);
  841.     otuple = (OverflowTuple)ABSADDR(curotuple->next);
  842.     }
  843.  
  844.     while (otuple != NULL) {
  845.     heapTuple = (HeapTuple)ABSADDR(otuple->tuple);
  846.     
  847.     inntuple = (TupleTableSlot)
  848.         ExecStoreTuple((Pointer) heapTuple,      /* tuple to store */
  849.                (Pointer) get_hj_HashTupleSlot(hjstate), /* slot */
  850.                InvalidBuffer, /* SP?? this tuple has no buffer */
  851.                false);      /* do not pfree this tuple */
  852.  
  853.     set_ecxt_innertuple(econtext, inntuple);
  854.     qualResult = ExecQual(hjclauses, econtext);
  855.     
  856.     if (qualResult) {
  857.         set_hj_CurOTuple(hjstate, otuple);
  858.         return heapTuple;
  859.     }
  860.      
  861.     otuple = (OverflowTuple)ABSADDR(otuple->next);
  862.     }
  863.     
  864.     /* ----------------
  865.      *    no match
  866.      * ----------------
  867.      */
  868.     return NULL;
  869. }
  870.  
  871. /* ----------------------------------------------------------------
  872.  *    hashFunc
  873.  *
  874.  *    the hash function, copied from Margo
  875.  * ----------------------------------------------------------------
  876.  */
  877.  
  878. int
  879. hashFunc(key,len)
  880.     char *key;
  881.     int len;
  882. {
  883.     register unsigned int h;
  884.     register int l;
  885.     register unsigned char *k;
  886.  
  887.     /*
  888.      * If this is a variable length type, then 'k' points
  889.      * to a "struct varlena" and len == -1.
  890.      * NOTE:
  891.      * VARSIZE returns the "real" data length plus the sizeof the
  892.      * "vl_len" attribute of varlena (the length information).
  893.      * 'k' points to the beginning of the varlena struct, so
  894.      * we have to use "VARDATA" to find the beginning of the "real"
  895.      * data.
  896.      */
  897.     if (len == -1) {
  898.     l = VARSIZE(key) - sizeof(long);    /* 'varlena.vl_len' is a long*/
  899.     k = (unsigned char*) VARDATA(key);
  900.     } else {
  901.     l = len;
  902.     k = (unsigned char *) key;
  903.     }
  904.  
  905.     h = 0;
  906.     
  907.     /*
  908.      * Convert string to integer
  909.      */
  910.     while (l--) h = h * PRIME1 ^ (*k++);
  911.     h %= PRIME2;
  912.  
  913.     return (h);
  914. }
  915.  
  916. /* ----------------------------------------------------------------
  917.  *    ExecHashPartition
  918.  *
  919.  *    determine the number of batches needed for a hashjoin
  920.  * ----------------------------------------------------------------
  921.  */
  922.  
  923. int
  924. ExecHashPartition(node)
  925. Hash node;
  926. {
  927.     Plan    outerNode;
  928.     int        b;
  929.     int        pages;
  930.     int        ntuples;
  931.     int        tupsize;
  932.  
  933.     /*
  934.      * get size information for plan node
  935.      */
  936.     outerNode = get_outerPlan((Plan) node);
  937.     ntuples = get_plan_size(outerNode);
  938.     if (ntuples == 0) ntuples = 1000;
  939.     tupsize = get_plan_width(outerNode) + sizeof(HeapTupleData);
  940.     pages = ceil((double)ntuples * tupsize * FUDGE_FAC / BLCKSZ);
  941.  
  942.     /*
  943.      * if amount of buffer space below hashjoin threshold,
  944.      * return negative
  945.      */
  946.     if (ceil(sqrt((double)pages)) > HashTBSize)
  947.        return -1;
  948.     if (pages <= HashTBSize)
  949.        b = 0;  /* fit in memory, no partitioning */
  950.     else
  951.        b = ceil((double)(pages - HashTBSize)/(double)(HashTBSize - 1));
  952.  
  953.     return b;
  954. }
  955.  
  956. /* ----------------------------------------------------------------
  957.  *    ExecHashTableReset
  958.  *
  959.  *    reset hash table header for new batch
  960.  * ----------------------------------------------------------------
  961.  */
  962.  
  963. void
  964. ExecHashTableReset(hashtable, ntuples)
  965. HashJoinTable hashtable;
  966. int ntuples;
  967. {
  968.     int i;
  969.     HashBucket bucket;
  970.  
  971.     hashtable->nbuckets = hashtable->totalbuckets
  972.     = ceil((double)ntuples/NTUP_PER_BUCKET);
  973.  
  974.     hashtable->overflownext = hashtable->top + hashtable->bucketsize *
  975.                   hashtable->nbuckets;
  976.  
  977.     bucket = (HashBucket)ABSADDR(hashtable->top);
  978.     for (i=0; i<hashtable->nbuckets; i++) {
  979.        bucket->top = RELADDR((char*)bucket + sizeof(*bucket));
  980.        bucket->bottom = bucket->top;
  981.        bucket->firstotuple = bucket->lastotuple = -1;
  982. #ifdef sequent
  983.         S_INIT_LOCK(&(bucket->bucketlock));
  984. #endif
  985.        bucket = (HashBucket)((char*)bucket + hashtable->bucketsize);
  986.     }
  987.    hashtable->pcount = hashtable->nprocess;
  988. }
  989.  
  990. static int hjtmpcnt = 0;
  991.  
  992. void
  993. mk_hj_temp(tempname)
  994. char *tempname;
  995. {
  996.     sprintf(tempname, "HJ%d.%d", getpid(), hjtmpcnt);
  997.     hjtmpcnt = (hjtmpcnt + 1) % 1000;
  998. }
  999.  
  1000. print_buckets(hashtable)
  1001. HashJoinTable hashtable;
  1002. {
  1003.    int i;
  1004.    HashBucket bucket;
  1005.    bucket = (HashBucket)ABSADDR(hashtable->top);
  1006.    for (i=0; i<hashtable->nbuckets; i++) {
  1007.       printf("bucket%d = (top = %d, bottom = %d, firstotuple = %d, lastotuple = %d)\n", i, bucket->top, bucket->bottom, bucket->firstotuple, bucket->lastotuple);
  1008.       bucket = (HashBucket)((char*)bucket + hashtable->bucketsize);
  1009.     }
  1010. }
  1011.